Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source S3: basic structure using file-based CDK #28786

Merged
merged 4 commits into from
Aug 1, 2023
Merged

Conversation

clnoll
Copy link
Contributor

@clnoll clnoll commented Jul 27, 2023

What

Adds the basic structure for using the file-based CDK for syncing data from S3:

  • S3 implementation of the AbstractFileBasedStreamReader
  • S3-specific config

This includes a few changes to the file-based CDK.

  • I've removed the file_type attribute on RemoteFile because we don't actually know the type of the RemoteFile when we create it. We could theoretically guess based on the stream config, but there's no need to.
  • The AbstractFileBasedStreamReader's open_file method now takes in a mode argument. This is passed in by the parser that's calling open_file, which knows its associated file read mode.

The S3 integration tests will be updated in a separate phase, which will verify that both the old and new config work.

Recommended reading order

  1. file-based CDK changes
  2. S3-specific changes

@clnoll clnoll requested a review from a team as a code owner July 27, 2023 15:44
@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/s3 labels Jul 27, 2023
@github-actions
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 02cbc303c7) - ❌

⏲️ Total pipeline duration: 02mn13s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly some clarifying questions in how some of the new constructs are being used and a few small things. Overall looks good!

prefixes = {glob.split("*")[0].rstrip("/") for glob in globs}
return list(filter(lambda x: bool(x), prefixes))
prefixes = {glob.split("*")[0] for glob in globs}
return set(filter(lambda x: bool(x), prefixes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we need to change this from a list to set? I looked at how get_prefixes_from_globs used in S3 and it looks like we just check it in an if statement followed by a for each loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We loop over the prefixes and submit a request to S3 for each one. I wanted to avoid submitting another request to S3 in the event that there were duplicates.


@property
@abstractmethod
def file_read_mode(self) -> FileReadMode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed file_read_mode is only used with the parser implementation file itself. So it is not strictly necessary that it be exposed in the interface. Is the idea in making it part of the interface to be an indicator that future file parser types need to provide an implementation to this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

@classmethod
def from_file_type(cls, file_type: str) -> "FileReadMode":
text_file_types = ["csv", "jsonl"]
binary_file_types = ["parquet", "avro"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this being called? I didn't see it usage in the PR or when I searched in IntelliJ. The only concern here is that these statically defined constants don't work that well if we decide to move in the direction of custom file formats per source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that was a remnant of something that I ultimately deleted. Good catch, removed.

def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/s3", scheme="https")

bucket: str = Field(description="Name of the S3 bucket where the file(s) exist.", order=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for consistency let's have everything have a title so we dont rely on pydantics auto capitalizing


@config.setter
def config(self, value: Config):
assert isinstance(value, Config)
Copy link
Contributor

@brianjlai brianjlai Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment here with our prior conversation to why we have to check this value for type safety. At face value without context it might seem redundant.

@clnoll I feel like this is worth discussing with other reviewers in the PR because it's probably one of the more confusing aspects. Since we're trying to force a specific subclass of AbstractFileBasedSpec (in this case S3 config) which makes sense. But it causes some type check warnings since the AbstractFileBasedStreamReader interface is generalized. I'm not sure of a better solution either

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, @brianjlai, added a comment.

To give some context to other reviewers - the basic background here is that config information is needed by both the FileBasedSource and the StreamReader, but for different reasons; FileBasedSource needs it to configure streams, and the StreamReader needs it so that it knows how to, e.g. authenticate with the storage system.

In this PR, I'm allowing FileBasedSource to read the config from disk and parse it (like normal), and once parsed, the source sets the config on the StreamReader. However, the type of config accepted by FileBasedSource is AbstractFileBasedSpec, and it only cares about keys that are source-agnostic, whereas StreamReader cares about keys that are specific to the 3rd party that it's reading from. For example, S3 will be looking for a aws_access_token_id key, but FileBasedSource won't itself need that.

So that leads us to this situation, where the S3 StreamReader's config setter requires a config of type (S3) Config, but the interface config setter takes an AbstractFileBasedSpec, which doesn't have S3-specific keys. To solve this, we assert that we were given the correct Config type for our type of StreamReader.

One alternative route that I went down involved reading in the config prior to the initialization of the Source, so that it could be given to the StreamReader as an argument. Since the Source requires a StreamReader, it could get the config off of that. This is somewhat undesirable because we end up reading the config twice (because AbstractSource still reads it deep within the CDK code), and it also deviates from the pattern of letting the Source validate the config, which may have error handling behavior that we want to keep. For those two reasons I'd prefer to keep the code as-is. But I'm open to other opinions.

return self._config

@config.setter
def config(self, value: Config):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, does mypy checks not flag this since the interface method expects AbstractFileBasedSpec even though Config is a subclass of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah mypy doesn't seem to mind about it.

total_n_keys = 0

try:
kwargs = {"Bucket": self.config.bucket}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting other kwargs? I'm not sure I see the benefit of putting Bucket behind kwargs and passing it to _page. It just makes it more opaque what it contains in that method.

Also related nit, if we do use kwargs, I'm used to seeing it as the last parameter in the _page function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a ContinuationToken in _page, so I moved the creation of the kwargs dict there.

for remote_file in self._page(s3, globs, kwargs, None, seen, logger):
yield remote_file

logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although len(seen) would be correct for both flows, wouldn't total_n_keys always be 1 if there are no prefixes. And for prefixes, it would always log 0 since it never changes total_n_keys. We count total_n_keys_for_prefix within _page() but we only print the extra log in that method.

What count are we trying to display here and would this extra log be duplicative with the ones in _page()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops yep total_n_keys should have been in each of the for remote_file in ... loops. Fixed.

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit f7fc7594a0) - ❌

⏲️ Total pipeline duration: 02mn23s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from f7fc759 to d6998cf Compare July 28, 2023 00:18
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit d6998cf10c) - ❌

⏲️ Total pipeline duration: 02mn21s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit a32b9d2f71) - ❌

⏲️ Total pipeline duration: 02mn49s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from a32b9d2 to 391a948 Compare July 29, 2023 18:47
@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Jul 29, 2023
@clnoll clnoll force-pushed the file-cdk-source-s3 branch from 391a948 to 066e413 Compare July 29, 2023 18:53
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 066e413dfd) - ❌

⏲️ Total pipeline duration: 04mn12s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from 066e413 to b7fc705 Compare July 29, 2023 20:08
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit b7fc705ccd) - ❌

⏲️ Total pipeline duration: 04mn11s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from b7fc705 to 15e9d16 Compare July 31, 2023 03:25
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 15e9d16d35) - ❌

⏲️ Total pipeline duration: 03mn57s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from 15e9d16 to db7be2c Compare July 31, 2023 13:57
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit db7be2c028) - ❌

⏲️ Total pipeline duration: 02mn55s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll
Copy link
Contributor Author

clnoll commented Jul 31, 2023

FYI I split out the file-based CDK portion of this here: #28862

@clnoll clnoll requested a review from brianjlai July 31, 2023 19:16
@clnoll clnoll force-pushed the file-cdk-source-s3 branch from db7be2c to b05fd4a Compare August 1, 2023 03:35
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit b05fd4a7ce) - ❌

⏲️ Total pipeline duration: 01mn26s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

if __name__ == "__main__":
args = sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(args)
source = FileBasedSource(SourceS3StreamReader(), Config, catalog_path)
Copy link
Contributor

@brianjlai brianjlai Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to stand up the method stubs for the S3FileSource instead of using FileBasedSource in this PR to help if the next set of PRs easier so there aren't conflicts modifying that central class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do, but I'm not anticipating that we'll need new methods on FileBasedSource so wasn't planning to create a separate S3FileSource (most of the customization occurs in the objects that are passed into FileBasedSource). Does that make sense to you or does it feel like there should be a dedicated S3FileSource?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was sketching out the work for the S3 legacy config transformer to the new config. A lot of the config validation against the spec is done automatically in the entrypoint. So the potentially easiest way to transform legacy manifests before we perform the validation is to have an S3FileSource override BaseConnector.read_config().

I'm still toying with ideas about how to do it best, but so far that was the simplest way to avoid having to manage multiple specs and versions of the config. But if you don't think your work will require it, then we won't conflict anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah what I'm doing now won't require it, so I think I'll let you make that change.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

from typing import Optional

from pydantic import BaseModel


class FileReadMode(Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be released before the airbyte-integrations as it will not be available for the source. Consequence: if someone needs to release source-s3, the tests will fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually moved, in a separate PR, so I've deleted it. This PR now just consists of the S3 connector changes.

"endpoint_url": config.endpoint,
"use_ssl": True,
"verify": True,
"config": ClientConfig(s3={"addressing_style": "auto"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something: why do we set client_kv_args = {"config": client_config} if we update client_kv_args["config"] right after with ClientConfig(s3={"addressing_style": "auto"})?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this is awkward (a remnant of copy/pasting & an incomplete refactor). Cleaned it up.

@octavia-squidington-iii octavia-squidington-iii removed the CDK Connector Development Kit label Aug 1, 2023
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 662eb54856) - ❌

⏲️ Total pipeline duration: 03mn11s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 7c52e0ef6a) - ❌

⏲️ Total pipeline duration: 03mn43s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from 7c52e0e to 1926f1a Compare August 1, 2023 15:57
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 1926f1a912) - ❌

⏲️ Total pipeline duration: 03mn21s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll force-pushed the file-cdk-source-s3 branch from 1926f1a to 56d8083 Compare August 1, 2023 16:25
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 56d80836c9) - ✅

⏲️ Total pipeline duration: 17mn19s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@clnoll clnoll merged commit 57d3daf into master Aug 1, 2023
@clnoll clnoll deleted the file-cdk-source-s3 branch August 1, 2023 16:45
@clnoll
Copy link
Contributor Author

clnoll commented Aug 3, 2023

/test connector=source-s3

⚠️ The test slash command is now deprecated.

The connector tests are automatically triggered as CI checks.

Please use /legacy-test if you need to test CDK or CAT changes.

Please join post to #pipeline-complaint-hotline slack channel if something is not working as expected.

@clnoll
Copy link
Contributor Author

clnoll commented Aug 3, 2023

/legacy-test connector=source-s3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants